[GLUTEN-1354][CORE] Fix the wrong result when calling the df.describe#1355
[GLUTEN-1354][CORE] Fix the wrong result when calling the df.describe#1355JkSelf wants to merge 2 commits intoapache:mainfrom
Conversation
|
The root cause is that native columnar2row will release the memory once the row is used. It's OK for most non-cached operations. But if the next operator needs to cache the row directly, it will cause issues because the buffer containing the row is already released. Fortunately in Spark most of the operator will copy the row when it needs to cache it. But it's not true if last operation is df.describe() which cache the row produced by previous operator directly. Here is a work around to use JVM columnar2row which allocate the row from heap memory so it can be safely cached. In theory it's a bug for Vanilla Spark as well, because if the previous operator produce the rows allocated on offheap, it will also be released once it's used. |
if we fallback to JVM columnarToRow, will it has a Velox2Arrow conversion before it? |
|
This is a bug for spark 3.2 and 3.3. We have fixed it in PR#40914 for spark 3.3. We will close this PR and this issue will be fixed when upgrading spark version into spark 3.4. |
What changes were proposed in this pull request?
When calling the
df.describe()method, it will call therdd.collect()method to cache theRDD[UnsafeRow]. If the last RDD isGlutenColumnarToRowRDD, theUnsafeRowwill be released when the batch is accessed. Sodf.describe()method will access the releasedRDD[UnsafeRow], which cause the wrong result. This PR will fallback ColumnarToRow if the ColumnarToRow is the last operator.Fix ISSUE#1354
How was this patch tested?
added unit test